Trabalhando com Streams Paralelas

Por Diego Soares
Postado em Abril 2016

Para quem ainda não conhece Streams, é uma API incorporada no Java a partir da versão 8. Ela nos permite processar um fluxo de dados de forma declarativa, onde temos os elementos de entrada, o processamento dos elementos e a preparação para saída. Pense como uma linha de montagem de uma fábrica, onde os produtos/matéria-prima são recebidos, são tratados e então preparados para uso. Para maior proveito do artigo sugiro que conheça o funcionamento da API, pois iremos aprofundar um pouco mais no assunto daqui para frente.

Quando trabalhamos com uma Stream serial com muitos elementos, problemas de performance começam a aparecer durante o seu processamento, logo, pensamos que a solução seria processar a Stream paralelamente, sendo necessário criar threads e dividir o processamento em várias tarefas, realmente seria uma boa ideia, porém não é nenhum um pouco simples controlarmos todo esse processamento da melhor forma possível. Pensando em produtividade e conformidade a API Stream veio pronta para processamento paralelo, basta usarmos e seguirmos suas regras para alcançarmos o resultado desejado.

Uma das características mais relevantes da API de Streams é o que vamos abordar neste artigo, tratamento de concorrência nativa, que é capaz de processar os resultados concorrentemente usando múltiplas threads. Por padrão, o número de threads disponíveis em Stream paralelas é o mesmo que o número de CPUs do seu ambiente (que pode ser obtido por meio do método Runtime.getRuntime().availableProcessors() ).

Criando Streams Paralelas
Streams paralelas podem ser criadas a partir de dois métodos:

  • parallel(): cria uma Stream paralela a partir de uma Stream existente.



Stream<String> stream = Arrays.asList("casa", "janela", "porta").stream();  
Stream<String> streamParela = stream.parallel();


Listagem 1

  • parallelStream(): cria uma Stream paralela a partir de uma coleção. A interface Collection inclui este método que pode ser chamado de qualquer coleção.
Stream<String> streamParelela = Arrays.asList("casa", "janela", "porta").parallelStream();

Listagem 2

Processando Tarefas em Paralelo

Em uma Stream serial podemos garantir a ordem que os elementos serão processados, porém com Streams paralelas não podemos garantir o mesmo. Existe uma versão do método forEach() chamada forEachOrdered(), que força uma Stream paralela a processar os resultados em ordem.





Arrays.asList(1, 2, 3)
 .parallelStream()
 .forEach(System.out::println);

Listagem 3





Arrays.asList(1, 2, 3)
 .parallelStream()
 .forEachOrdered(System.out::println);

Listagem 4

As Streams declaradas nas duas listagens acima possuem o mesmo conteúdo e são paralelas. Na Listagem 3 não conseguimos determinar a ordem que os elementos serão impressos, pois estamos trabalhando com processamento paralelo. Já na Listagem 4, o método forEachOrdered() garante que a Stream paralela será processa serialmente sendo possível garantir a ordem que os elementos serão impressos.

Evitando Operações Stateful

Efeitos colaterais podem aparecer em Streams paralelas se suas expressões lambdas forem stateful. Expressão lambda stateful é aquela que o resultado depende de um estado que pode mudar durante a execução do pipeline.




List<Integer> dados = Collections.synchronizedList(new ArrayList<>()); 
Arrays.asList(1, 2, 3) 
.parallelStream() 
.map(i -> {dados.add(i); return i;}) 
.forEachOrdered(i -> System.out.print(i + " "));
System.out.println();    
for (Integer i : dados) { 
// ordem indeterminada  
System.out.print(i + " "); 
}  


Listagem 5

Para exemplificar, na Listagem 5 foi usada uma expressão lambda stateful no método map de uma Stream paralela. Consideramos está expressão como stateful, pois ela adiciona um elemento na coleção que pode mudar durante a execução paralela, haja vista que mais de um elemento pode ser adicionado por vez, mudando assim seu estado.

Olhando superficialmente pensamos que o código acima imprime duas linhas iguais, porém não é bem assim. Não podemos garantir a ordem que o método map() executa cada elemento da Stream, pois estamos falando de Stream paralelas. Portanto, podemos garantir que o método forEachOrdered irá imprimir na ordem desejada, ou seja, “1 2 3”.

Processando Reduções Paralelas

Desde que a ordem não é garantida em Streams paralelas, métodos como findAny() em Streams paralelas podem resultar em comportamentos inesperados, ou seja, não são predicáveis. Vejamos o exemplo abaixo:



System.out.println(
Arrays.asList("J", "A", "V", "A")
 .stream()
 .findAny().get());  // w   
 
  
System.out.println(
Arrays.asList("J", "A", "V", "A")
 .parallelStream()
 .findAny().get()); // não é predicável


Listagem 6

Na Listagem 6 o resultado do primeiro “println” é “A”, pois o processamento da Stream é serial. Já no segundo “println” não podemos garantir o mesmo, pois a Stream é processada de forma paralela.

Operações de redução em paralelo

Uma operação de redução recebe uma sequência de elementos de entrada e combina-os em um único resultado aplicando repetitivamente uma operação de combinação, tais como encontrar a soma ou máximo de um conjunto de números, ou elementos que se acumulam em uma lista. A API de Stream disponibilizada dois métodos: reduce e collect.

O método reduce

Transforma uma Stream em um único objeto, como por exemplo, uma Stream de String em uma única String com todos os elementos concatenados. Normalmente usamos o método reduce apenas com dois parâmetros, porém para reduções paralelas precisamos de mais um parâmetro, o combinador.

<U> U reduce(U identidade, BiFunction<U, ? super T,U> acumulador, BinaryOperator<U> combinador)

Listagem 7

Para entendermos melhor, vejamos abaixo o papel de cada parâmetro:

  • Identidade: tem dois objetivos ser a “semente” inicial da redução e o valor padrão, caso não haja elementos a processar. O tipo da identidade é quem define qual será o retorno do método reduce, como por exemplo: se a identidade é uma String, então o reduce retornará uma String.
  • Acumulador: recebe dois parâmetros, um resultado parcial e o próximo elemento, combina-os e gera um novo resultado parcial.
  • Combinador: combina dois resultados parciais para produzir um novo resultado parcial. É necessário em reduções paralelas, onde a entrada é particionada e uma realizada uma acumulação parcial para cada partição, e então os resultados parciais são combinados para produzir um resultado final.



System.out.println(
Arrays.asList("J", "A", "V", "A")
 .parallelStream()
 .reduce("", (s1, s2) -> s1 + s2, (s3, s4) -> s3 + s4));

Listagem 8

Aplicando este processo no exemplo acima, foram formadas as Strings parciais “JA” e “VA” e então combinadas, resultando em “JAVA”.
Mais formalmente, os parâmetros do método reduce devem seguir algumas regras para obtermos o resultado desejado de uma Stream paralela, que são:

  • O valor identidade deve ser uma identidade para função do combinador. Isto quer dizer que para todo ucombiner.apply(identidade, u) deve ser igual à u.
  • combinador deve ser associativo e deve ser compatível com a função acumulador: para todo tcombinador.apply(u, acumulador.apply(identidade, t) deve ser igual à acumulador.apply(u, t).

Referências: https://docs.oracle.com/javase/8/docs/api/java/util/stream/package-summary.html

Postado por Diego Soares Rodrigues é um experiente profissional de TI relacionadas com os serviços financeiros, automação bancária e ATM. Experiência em C / C ++, Java, Linux, Shell Script, Visual Basic 6, Oracle PL / SQL e PostgreSQL PL / pgSQL. Sólidos conhecimentos de sistemas de integração usando Oracle Advanced Queuing (OAQ), Web Services e mensagens de soquete. Certificações Oracle Certified Associate, Java SE 8 Programador e Oracle Database SQL Certified Expert.

Este artigo foi revisto pela equipe de produtos Oracle e está em conformidade com as normas e práticas para o uso de produtos Oracle.